package cn.mabach.portal.kafka;

import cn.mabach.portal.dianping.mongo.ShopTfIdf;
import cn.mabach.portal.dianping.mongo.WordInshop;
import cn.mabach.portal.dianping.mongo.WordScore;
import cn.mabach.portal.dianping.mongo.WordTfIdfInShop;
import cn.mabach.portal.kafka.mongoEntity.ShopAccess;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.util.*;
import java.util.concurrent.TimeUnit;

/**
 * 基于用户/商品画像实时推荐
 * @Author: ming
 * @Date: 2020/3/26 0026 下午 4:27
 */
@Component
@Slf4j
public class Consumer2 {

    @Autowired
    private RedisTemplate redisTemplate;
    @Autowired
    private MongoTemplate mongoTemplate;




    @KafkaListener(topics = "topic5")
    public void listen (ConsumerRecord<?, ?> record) throws Exception {

        String s = record.value().toString();
        ArrayList<ShopAccess> shopAccesses = new ArrayList<>();

        if (s.contains("aclog_")){
            String[] sl = s.split("aclog_ ")[1].split(" ");
            String uid=sl[0];
            String access=sl[1];
            Integer shopId=Integer.valueOf(sl[2]);
            if (StringUtils.isEmpty(uid)){
                return;
            }

            Map<Integer,ShopAccess> o = (Map<Integer,ShopAccess>) redisTemplate.boundValueOps(uid).get();

//            如果是第一次进来 则创建行为bean
            if (o==null||o.size()==0){
                ShopAccess sc = new ShopAccess();

//                根据类型设置用户行为
                ShopAccess setuac = setuac(sc, access);
                Map<Integer,ShopAccess> oo=new HashMap<>();
                oo.put(shopId,setuac);

                redisTemplate.boundValueOps(uid).set(oo,30, TimeUnit.MINUTES);

            }


//若果uid已经存在，则对原有uid修改
            Map<Integer,ShopAccess> os = (Map<Integer,ShopAccess>) redisTemplate.boundValueOps(uid).get();


//            如果map -id集合中不包含传进来的 shopid,则新添加一个shopAccess
            if (!os.keySet().contains(shopId)){

                ShopAccess access1 = new ShopAccess();
                ShopAccess setuac = setuac(access1, access);
                os.put(shopId,setuac);

            }else {
                //            如果map -id集合中包含传进来的 shopid,则则对原有shopAccess进行修改
                ShopAccess shopAccess = os.get(shopId);
                ShopAccess setuac = setuac(shopAccess, access);
                os.put(shopId,setuac);

            }

            redisTemplate.boundValueOps(uid).set(os,30, TimeUnit.MINUTES);

//对map进行排序返回linkhashmap
            ArrayList<Map.Entry<Integer, ShopAccess>> entryArrayList = new ArrayList<>(os.entrySet());
            Collections.sort(entryArrayList, new Comparator<Map.Entry<Integer, ShopAccess>>() {
                @Override
                public int compare(Map.Entry<Integer, ShopAccess> o1, Map.Entry<Integer, ShopAccess> o2) {
                    if (o1.getValue().getDate()-o2.getValue().getDate()>0){
                        return -1;
                    }
                    if (o1.getValue().getDate()-o2.getValue().getDate()<0){
                        return 1;
                    }
                    return 0;
                }
            });
//            截取前20个评分过的shop
            List<Map.Entry<Integer, ShopAccess>> subList=entryArrayList;
            if (entryArrayList.size()>20){
                subList = entryArrayList.subList(0, 20);

            }

            LinkedHashMap<Integer, ShopAccess> newMap = new LinkedHashMap<>();
            for (Map.Entry<Integer, ShopAccess> shopAccessEntry : subList) {
                newMap.put(shopAccessEntry.getKey(),shopAccessEntry.getValue());
            }
//将前最近浏览20个map重新存入redis
            redisTemplate.boundValueOps(uid).set(newMap,30, TimeUnit.MINUTES);


//计算当前用户浏览过的最近20个shop加权画像词评分
            HashMap<Integer, Double> wordWeightMap = new HashMap<>();
            for (Map.Entry<Integer, ShopAccess> shopAccessEntry : subList) {
                Integer sid = shopAccessEntry.getKey();

                Double sw = getShopWeight(shopAccessEntry.getValue(), sid);
                ShopTfIdf shopTfidf = mongoTemplate.findById(sid, ShopTfIdf.class);
                List<WordScore> word_tfIdf = shopTfidf.getWordScore();
//                对该从mongdb取出来shop的所有词进行遍历，加权封装到用户画像中
                if (!CollectionUtils.isEmpty(word_tfIdf)){
                    for (WordScore shopWordtfIdf : shopTfidf.getWordScore()) {

                      if (shopWordtfIdf.get_2().equals(0.0)){
                          continue;
                      }
                        Integer wordId = shopWordtfIdf.get_1();
                        Double weight= shopWordtfIdf.get_2()*sw;
                        Double oldWeight = wordWeightMap.get(wordId);
//                    为了保证原wordWeightMap中的weight最大，跳过比之前weight小的该次循环
                        if (wordWeightMap!=null&&oldWeight!=null){
                            if (oldWeight>weight){
                                continue;
                            }
                        }

                        wordWeightMap.put(wordId,weight);
                    }
                }


            }


            HashMap<Integer, Double> indexMap = new HashMap<>();
            for (Map.Entry<Integer, Double> entry : wordWeightMap.entrySet()) {
                Integer wid = entry.getKey();

//               wWeight 用户画像中某个词的权重
                Double wWeight = entry.getValue();
                WordInshop wordInshop = mongoTemplate.findById(wid, WordInshop.class);
//                根据词id来查询词-出现在某电影-tfidf不为空
                if (wordInshop!=null){
                    for (WordTfIdfInShop wordTfIdfInShop : wordInshop.getWordTfIdfInShop()) {
                        Integer shop_id = wordTfIdfInShop.getSid();
                        Double theWordWeight = wordTfIdfInShop.getScore();
                        Double oldWW = indexMap.get(shop_id);
//                    用户画像的词权重*该次出现在某电影的ifidf=最终权重，再把该商品id +最重权重封装到indexMap中
                        if (!CollectionUtils.isEmpty(indexMap)&&oldWW!=null){
                            indexMap.put(shop_id,oldWW+theWordWeight*wWeight);

                        }
                        indexMap.put(shop_id,theWordWeight*wWeight);

                    }
                }

            }
//对获得的用户、商品画像推荐的物品进项排序
            ArrayList<Map.Entry<Integer, Double>> entries3 = new ArrayList<>(indexMap.entrySet());
            Collections.sort(entries3, new Comparator<Map.Entry<Integer, Double>>() {
                @Override
                public int compare(Map.Entry<Integer, Double> o1, Map.Entry<Integer, Double> o2) {
                    if (o1.getValue()-o2.getValue()>0){
                        return  -1;
                    }
                    if (o1.getValue()-o2.getValue()<0){
                        return  1;
                    }
                    return 0;
                }
            });
//            截取前100个最相似物品
            List<Map.Entry<Integer, Double>> entries5=entries3;
            if (entries3.size()>100){
                 entries5 = entries3.subList(0, 100);
            }

//            LinkedHashMap<Integer, Double> integerDoubleLinkedHashMap = new LinkedHashMap<>();
            ArrayList<Integer> integerArrayList = new ArrayList<>();
            for (Map.Entry<Integer, Double> integerDoubleEntry : entries5) {
                integerArrayList.add(integerDoubleEntry.getKey());

            }


//将用户画像存进redis
            redisTemplate.boundValueOps("SIMILAR_"+uid).set(integerArrayList,1,TimeUnit.DAYS);
            log.info("推荐商品长度：{}",integerArrayList.size());




        }

    }

    private Double getShopWeight(ShopAccess shopAccess,Integer id) {
        Integer click= shopAccess.getClick()==true?1: 0;
        Integer collect= shopAccess.getCollect()==true?2: 0;
        Integer share= shopAccess.getShare()==true?2: 0;
        Integer buy= shopAccess.getBuy()==true?3: 0;
        Integer pay= shopAccess.getPay()==true?5: 0;
        Long time=(System.currentTimeMillis()-shopAccess.getDate())/1000/60/60;


        double l = Math.log(1.0 + 1.0 / (1.0+time*1.0));


        Double weight= (click+collect+share+buy+pay)*l*0.1;


        return weight;
    }





    private ShopAccess setuac(ShopAccess shopAccess, String anObject) {
        shopAccess.setDate(System.currentTimeMillis());
        if ("click".equals(anObject)) {
            shopAccess.setClick(true);
        }

        if ("collect".equals(anObject)) {
            shopAccess.setCollect(true);
        }

        if ("share".equals(anObject)) {
            shopAccess.setShare(true);
        }

        if ("buy".equals(anObject)) {
            shopAccess.setBuy(true);
        }

        if ("pay".equals(anObject)) {
            shopAccess.setPay(true);
        }

        return shopAccess;
    }

}
